使用netty实现并维护TCP长连接 您所在的位置:网站首页 java socket连接池框架 使用netty实现并维护TCP长连接

使用netty实现并维护TCP长连接

2023-08-16 07:30| 来源: 网络整理| 查看: 265

使用netty实现并维护TCP长连接 Netty是什么Netty的优点Netty为什么并发高 创建TCP长连接实例

Netty是什么

Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。 Netty 是一个广泛使用的 Java 网络编程框架(Netty 在 2011 年获得了Duke’s Choice Award,见https://www.java.net/dukeschoice/2011)。它活跃和成长于用户社区,像大型公司 Facebook 和 Instagram 以及流行 开源项目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其强大的对于网络抽象的核心代码。

Netty的优点

1.并发高 2.传输快 3.封装好

Netty为什么并发高

Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,对比于BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高

创建TCP长连接实例 该类的作用是创建客户端连接与创建该链接的监听并无限重连 mport java.util.concurrent.TimeUnit; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; public class CreateTCPConnection { private String host; //连接主机ip private Integer port; //连接主机端口 public EventLoopGroup group; public Channel channel; public ChannelFuture f; public CreateTCPConnection(String host, Integer port) { this.host = host; this.port = port; } /** * 创建TCP长连接并实例化channel */ public void connect() { if(group!=null){ group.shutdownGracefully(); //这里调用这个看看会不会释放 } group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) //1 设置reactor 线程 .option(ChannelOption.SO_KEEPALIVE, true) //1 设置通道选项 .channel(NioSocketChannel.class) //2 设置nio类型的channel .handler(new ClientChannelInitializer(CreateTCPConnection.this)); //3 装配流水线 //创建连接 f = b.connect(host, port); //检测并执行断线重连 f.addListener((ChannelFutureListener) channelFuture -> { if (!channelFuture.isSuccess()) { final EventLoop loop = channelFuture.channel().eventLoop(); loop.schedule(() -> { TdLog.error(host + " : 服务端链接不上,开始重连操作..."); connect(); //此处处理重连次数 }, 1L, TimeUnit.SECONDS); } else { channel = channelFuture.channel(); TdLog.info("服务端链接成功..."); } }); } catch (Exception e) { e.printStackTrace(); } } } 客户端通道配置类 import java.util.concurrent.TimeUnit; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; /** * 类描述:TCP通道配置 */ public class ClientChannelInitializer extends ChannelInitializer { private CreateTCPConnection createTCPConnection; public ClientChannelInitializer(CreateTCPConnection createTCPConnection) { this.createTCPConnection = createTCPConnection; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //心跳检测 pipeline.addLast(new IdleStateHandler(0, 30, 0, TimeUnit.SECONDS)); //字符串编码(默认编码) pipeline.addLast("encoder", new StringEncoder()); //字符串解码(重写类,如不需要可使用默认) pipeline.addLast(new TCPBaseMessageDecoder()); //客户端的逻辑 pipeline.addLast("handler", new NettyClientHandler(createTCPConnection)); } } 重写decode类 import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import org.apache.commons.collections.CollectionUtils; /** * TCP消息解码器 * @author 07408 * */ @TdClassLog public class TCPBaseMessageDecoder extends ByteToMessageDecoder { private List byteList =new ArrayList(); @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { //处理实际业务 while (in.isReadable()) { System.err.print((char)in.readByte()); } } } SimpleChannelInboundHandler实现类能管理并使用该客户端连接 import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoop; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; /** * 类描述:TCP客户端适配器 */ public class NettyClientHandler extends SimpleChannelInboundHandler { private CreateTCPConnection nettyClient; private int heartNumber; public NettyClientHandler(CreateTCPConnection createTCPConnection) { nettyClient = createTCPConnection; } //这个方法接收不到数据 @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg){} /** * 长连接读取数据方法-处理业务逻辑 * @param ctx * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIP = insocket.getAddress().getHostAddress(); System.out.println(clientIP+" Server say : " + msg.toString()); } /** * 通道连接时调用-处理业务逻辑 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIP = insocket.getAddress().getHostAddress(); TdLog.error(clientIP + " :通道已连接!"); } /** * 通道闲置触发-启动断线重连功能 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) { //使用过程中断线重连 final EventLoop eventLoop = ctx.channel().eventLoop(); eventLoop.schedule(() -> { TdLog.error("断线连接中..."); nettyClient.connect(); }, 1, TimeUnit.SECONDS); ctx.fireChannelInactive(); } /** * 心跳方法 * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.READER_IDLE)) { System.out.println("READER_IDLE"); } else if (event.state().equals(IdleState.WRITER_IDLE)) { /**发送心跳,保持长连接*/ byte[] buff = {'V','Z',1,0,0,0,0,0}; String cmd = new String(buff); ctx.channel().writeAndFlush(cmd); } else if (event.state().equals(IdleState.ALL_IDLE)) { System.out.println("ALL_IDLE"); } } super.userEventTriggered(ctx, evt); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { super.handlerAdded(ctx); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { super.handlerRemoved(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } } 本地调试测试连接能否正常使用 public static void main(String[] args) throws InterruptedException { CreateTCPConnection pu = new CreateTCPConnection("10.30.20.185", 8131); pu.connect(); }


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

      专题文章
        CopyRight 2018-2019 实验室设备网 版权所有